Run the cudf-polars test suite against DaskEngine and RayEngine#22381
Run the cudf-polars test suite against DaskEngine and RayEngine#22381rapids-bot[bot] merged 28 commits intorapidsai:mainfrom
DaskEngine and RayEngine#22381Conversation
7bb501d to
4c5b5da
Compare
d088a13 to
3fccdf3
Compare
3fccdf3 to
b294bf8
Compare
| # (rapidsmpf compatibility already validated in rapidsmpf CI) | ||
| matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) | ||
| build_type: pull-request | ||
| container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000" |
There was a problem hiding this comment.
These changes are necessary to provide enough resources for UCX. For reference, we already do the same for UCXX and RapidsMPF, both need it for the same reason:
The ConditionalJoin lowering only triggered the multi-partition fallback
when the smaller side had more than one partition, missing asymmetric
cases such as (left=2, right=1).
In those cases no Repartition was inserted, so on multi-rank engines the
single right partition only existed on one rank. Peer ranks executed the
conditional join against an empty right side and silently dropped rows.
For example, with num_ranks=2 a join_where producing 90 rows on CPU
returned only 54 rows on RayEngine, missing the 36 rows from rank 1.
Fix this by gating the fallback on output_count > 1, i.e.
max(left_count, right_count) > 1. This ensures the smaller side is
repartitioned into a single broadcastable partition whenever either
input is multi-partition. The single-partition equi-case remains
unchanged.
Surfaced while enabling DaskEngine and RayEngine in the cudf-polars test
fixture matrix. The failing cases:
test_join_conditional[{dask,ray}-9-{True,False}]
reported:
DataFrames are different (height mismatch)
[left]: 90
[right]: 54
With this fix all 16 cases pass (4 engines × 2 reverse × 2
max_rows_per_partition), and SPMD now correctly emits the fallback
warning for max_rows_per_partition=9.
| fallback_msg = "ConditionalJoin not supported for multiple partitions." | ||
| if left_count < right_count: | ||
| if left_count > 1 or dynamic_planning: | ||
| if output_count > 1 or dynamic_planning: |
There was a problem hiding this comment.
Fixed a bug in the conditional-join fallback logic. We need to repartition the smaller table to a single broadcastable partition whenever either side has more than one partition.
Previously the fallback only triggered when the smaller side itself had multiple partitions, which breaks asymmetric cases like (left=2, right=1). In that situation the single right partition only exists on one rank, so peer ranks execute the conditional join against an empty right side and silently drop rows.
@TomAugspurger and @rjzamora, does that sound correct?
There was a problem hiding this comment.
I'll defer to Rick on this...
Though if the new way is correct, then maybe the logic would be a bit more obvious if we combine the first two if conditions?
if (left_count < right_count) and (output_count > 1 or dynamic_planning):
left = Repartition(left.schema, left)
pi_left[left] = PartitionInfo(count=1)
_fallback_inform(fallback_msg, config_options)
elif output_count > 1 or dynamic_planning:
right = Repartition(right.schema, right)
pi_right[right] = PartitionInfo(count=1)
_fallback_inform(fallback_msg, config_options)Then the repeated condition (left_count < right_count) will maybe a bit a bit clear (that's true, but the second condition output_count > 1 or dynamic_planning isn't.
At least I hope those are logically equivalent.
There was a problem hiding this comment.
I guess we are maybe disabling dynamic planning for some join tests? By default, dynamic-planning is always on, so we always repartition the smaller side.
There was a problem hiding this comment.
I guess we are maybe disabling dynamic planning for some join tests? By default, dynamic-planning is always on, so we always repartition the smaller side.
Maybe, it only happens when running with multiple GPUs
There was a problem hiding this comment.
Though if the new way is correct, then maybe the logic would be a bit more obvious if we combine the first two if conditions?
Yes, and we can even simplify a bit more. Done
bdice
left a comment
There was a problem hiding this comment.
Approving CI and packaging!
|
It looks like the Part of that is because we're running these tests against multiple versions of polars. I think we could get away with running the full suite of tests (all tests, all engines) against only the latest version of polars. But I also wonder whether we're still unexpectedly creating expensive resources like Ray actors in too many places. |
pentschev
left a comment
There was a problem hiding this comment.
Left a couple of comments but I won't block this PR. I don't think it's wise for us to hardcode test details to fit CI, rather we should make them configurable and make CI fit that instead.
I think this was just a slower runner. Note that its sibling job only took 16 minutes. In fact, I do not even think |
6e28a16 to
83059fd
Compare
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Enterprise Run ID: 📒 Files selected for processing (22)
💤 Files with no reviewable changes (1)
📝 WalkthroughSummary by CodeRabbit
WalkthroughThis PR extends cudf-polars' test infrastructure to support multi-rank Ray streaming execution alongside SPMD. Ray is added as an optional dependency, container runtime options are configured for CI, and tests are refactored to use conditional fixtures and warning helpers that adapt behavior based on engine type and rank configuration. ChangesRay Dependency & Infrastructure Setup
Test Suite Refactoring for Multi-Rank Support
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
mroeschke
left a comment
There was a problem hiding this comment.
Just one comment about adding a run_constraints for ray in the conda recipe.
| common: | ||
| - output_types: [conda, requirements, pyproject] | ||
| packages: | ||
| - ray>=2.55.1 |
There was a problem hiding this comment.
In conda/recipes/cudf-polars/recipe.yaml, we'll probably want to add a
run_constraints:
- ray >=2.55.1to mirror this constraint
There was a problem hiding this comment.
Ray is still optional, this is just for the CI testing
There was a problem hiding this comment.
Right, IIRC run-constraints is like specifying a version constraint for optional runtime dependencies (like what got added to the pyproject.toml)
https://rattler-build.prefix.dev/latest/reference/recipe_file/#run-constraints
Packages that are optional at runtime but must obey the supplied additional constraint if they are installed.
But can be done in a follow up since the CI is currently green
There was a problem hiding this comment.
Ah, I didn't know that, thanks.
But yes, let's do it in a follow-up PR.
|
/merge |
…apidsai#22381) Builds on the cached `streaming_engines` fixture from rapidsai#22364, which amortizes SPMD bootstrap via `_reset()`, and extends the same pattern to Dask and Ray. With this change, the test matrix runs against: `["in-memory", "spmd", "spmd-small", "dask", "ray"]` subject to package availability and `rrun` gating. We might change the different setups later, but for now CI runs: | Engine | Block Size(s) | GPU Configuration | |----------------|-----------------------|-------------------| | `SPMDEngine` | `"medium"`, `"small"` | Single GPU | | `DaskEngine` | `"medium"` | Single GPU | | `RayEngine` | `"medium"` | Two GPUs | Authors: - Mads R. B. Kristensen (https://github.com/madsbk) - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Matthew Murray (https://github.com/Matt711) - Bradley Dice (https://github.com/bdice) - Peter Andreas Entschev (https://github.com/pentschev) - Matthew Roeschke (https://github.com/mroeschke) URL: rapidsai#22381
Builds on the cached
streaming_enginesfixture from #22364, which amortizes SPMD bootstrap via_reset(), and extends the same pattern to Dask and Ray.With this change, the test matrix runs against:
["in-memory", "spmd", "spmd-small", "dask", "ray"]subject to package availability and
rrungating.We might change the different setups later, but for now CI runs:
SPMDEngine"medium","small"DaskEngine"medium"RayEngine"medium"